package com.superid.entity;

import com.superid.schema.MySchema;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


/**
 * 所有需要用Avro发送的消息实体类都必须继承此类
 * @author dufeng
 * @create: 2018-08-08 11:12
 */
public abstract class AvroEntity {

    private static final Logger logger = LoggerFactory.getLogger(AvroEntity.class);

    public static final char UNDERLINE = '_';

    private static Map<Class, Schema> parsedSchemaCache = new ConcurrentHashMap<>();


    protected GenericRecord getAvroRecord() throws Exception {
        GenericRecord avroRecord = new GenericData.Record(getParsedSchema(this.getClass()));

        fillAvroRecord(avroRecord);
        return avroRecord;
    }

    /**
     * 把对象的属性填充到AvroRecord对象中
     *
     * @param avroRecord
     * @throws Exception
     */
    protected void fillAvroRecord(GenericRecord avroRecord) throws Exception {

        List<Field> fields = getNonStaticFields(this.getClass());
        for (Field f : fields) {
            String name = getSimpleFieldType(camelToUnderline(f.getName()));
            f.setAccessible(true);
            Object value = f.get(this);
            avroRecord.put(name, value);
        }

    }

    /**
     * 获取给定类的解析后的Schema信息
     *
     * @param clazz
     * @return
     */
    public static Schema getParsedSchema(Class clazz) {
        //缓存中如果有，直接从缓存中拿
        if (parsedSchemaCache.get(clazz) != null) {
            return parsedSchemaCache.get(clazz);
        }

        MySchema.Builder builder = new MySchema.Builder()
                .type(MySchema.RECORD_TYPE)
                .name(clazz.getSimpleName());
        for (Map.Entry<String, String> entry : getFieldNameAndType(clazz).entrySet()) {
            builder.field(entry.getKey(), entry.getValue());
        }
        Schema parsedSchema = builder.build().getParsedSchema();

        //加入缓存
        parsedSchemaCache.put(clazz, parsedSchema);

        return parsedSchema;
    }


    /**
     * 获取给定类的非静态字段列表
     *
     * @param clazz
     * @return
     */
    public static List<Field> getNonStaticFields(Class clazz) {
        Field[] fields = clazz.getDeclaredFields();

        List<Field> result = new ArrayList();
        for (Field f : fields) {
            if (Modifier.isStatic(f.getModifiers())) {
                continue;
            }
            result.add(f);
        }

        return result;
    }

    /**
     * 获取所有非静态字段的字段名字(下划线分隔)及其类型
     *
     * @param clazz
     * @return
     */
    public static Map<String, String> getFieldNameAndType(Class clazz) {
        List<Field> fields = getNonStaticFields(clazz);

        Map<String, String> result = new HashMap<>();
        for (Field f : fields) {
            String name = camelToUnderline(f.getName());
            String type = getSimpleFieldType(f.getType().getTypeName());
            result.put(name, type);
        }

        return result;
    }


    /**
     * 获取字段类型的简要名称
     *
     * @param fieldType
     * @return
     */
    public static String getSimpleFieldType(String fieldType) {
        if ("java.lang.String".equals(fieldType)) {
            return "string";
        } else if ("java.util.Map".equals(fieldType)) {
            return "map";
        } else {
            return fieldType;
        }
    }

    /**
     * 驼峰格式字符串转换为下划线格式字符串
     *
     * @param param
     * @return
     */
    public static String camelToUnderline(String param) {
        if (param == null || "".equals(param.trim())) {
            return "";
        }
        int len = param.length();
        StringBuilder sb = new StringBuilder(len);
        for (int i = 0; i < len; i++) {
            char c = param.charAt(i);
            if (Character.isUpperCase(c)) {
                sb.append(UNDERLINE);
                sb.append(Character.toLowerCase(c));
            } else {
                sb.append(c);
            }
        }
        return sb.toString();
    }
}
